// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package xform

import (
	"github.com/cockroachdb/cockroach/pkg/sql/opt"
	"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
	"github.com/cockroachdb/cockroach/pkg/sql/opt/ordering"
	"github.com/cockroachdb/cockroach/pkg/sql/opt/props/physical"
	"github.com/cockroachdb/errors"
)

// IsCanonicalSetOp returns true if the private is for the canonical version of
// the set operator. This is the operator that is built initially (and has an
// empty ordering), as opposed to variants generated by the
// GenerateStreamingSetOp exploration rule.
func (c *CustomFuncs) IsCanonicalSetOp(private *memo.SetPrivate) bool {
	return private.Ordering.Any()
}

// GenerateStreamingSetOp generates variants of a set operation with more
// specific orderings on the columns, using the interesting orderings property.
// See the GenerateStreamingSetOp rule.
func (c *CustomFuncs) GenerateStreamingSetOp(
	grp memo.RelExpr,
	required *physical.Required,
	op opt.Operator,
	left memo.RelExpr,
	right memo.RelExpr,
	private *memo.SetPrivate,
) {
	orders := ordering.DeriveInterestingOrderings(grp)
	for _, order := range orders {
		// A streaming set operation must have an ordering that includes all output
		// columns. If the interesting ordering includes some columns but not all,
		// add the remaining columns in an arbitrary (but deterministic) order.
		missing := grp.Relational().OutputCols.Difference(order.ColSet())
		if !missing.Empty() {
			order = order.Copy()
			missing.ForEach(func(col opt.ColumnID) {
				order.AppendCol(col, false /* descending */)
			})
		}

		newPrivate := *private
		newPrivate.Ordering = order

		switch op {
		case opt.UnionOp:
			newExpr := memo.UnionExpr{
				Left:       left,
				Right:      right,
				SetPrivate: newPrivate,
			}
			c.e.mem.AddUnionToGroup(&newExpr, grp)

		case opt.IntersectOp:
			newExpr := memo.IntersectExpr{
				Left:       left,
				Right:      right,
				SetPrivate: newPrivate,
			}
			c.e.mem.AddIntersectToGroup(&newExpr, grp)

		case opt.IntersectAllOp:
			newExpr := memo.IntersectAllExpr{
				Left:       left,
				Right:      right,
				SetPrivate: newPrivate,
			}
			c.e.mem.AddIntersectAllToGroup(&newExpr, grp)

		case opt.ExceptOp:
			newExpr := memo.ExceptExpr{
				Left:       left,
				Right:      right,
				SetPrivate: newPrivate,
			}
			c.e.mem.AddExceptToGroup(&newExpr, grp)

		case opt.ExceptAllOp:
			newExpr := memo.ExceptAllExpr{
				Left:       left,
				Right:      right,
				SetPrivate: newPrivate,
			}
			c.e.mem.AddExceptAllToGroup(&newExpr, grp)

		default:
			panic(errors.AssertionFailedf("unhandled set operation: %s", op.String()))
		}
	}
}
